Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a wait_for_samples method to the MovingWindow #1159

Merged
merged 5 commits into from
Feb 12, 2025

Conversation

shsms
Copy link
Contributor

@shsms shsms commented Feb 4, 2025

Closes #967

@shsms shsms requested a review from a team as a code owner February 4, 2025 14:20
@shsms shsms requested review from daniel-zullo-frequenz and removed request for a team February 4, 2025 14:20
@github-actions github-actions bot added part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests part:data-pipeline Affects the data pipeline labels Feb 4, 2025
src/frequenz/sdk/timeseries/_ringbuffer/buffer.py Outdated Show resolved Hide resolved
src/frequenz/sdk/timeseries/_moving_window.py Show resolved Hide resolved
src/frequenz/sdk/timeseries/_moving_window.py Show resolved Hide resolved
@@ -318,6 +320,34 @@ def window(
start, end, force_copy=force_copy, fill_value=fill_value
)

async def wait_for_samples(self, n: int) -> None:
"""Wait until the next `n` samples are available in the MovingWindow.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is valid samples? If so, I am actually not sure if we want this or something time-based i.e. allow that not all samples are valid. However, for our current use-case this also works since we would set n=1 anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting. valid means that any data was received. If a component is missing data, resampler will send None and that is not a valid value.

If a component is sending only None, should this function return after n Nones are received? I'm guessing it should?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, in many scenarios I wouldn't distinguish between missing or None values. I think it shouldn't return after n Nones but after n new time steps which have at least 1 real value. However, we could leave this also for later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated it to return after n samples are received. Whether they were valid or not needs to be checked with a call to count_valid. I've also updated the docs to state this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see the confusion, I understood that it triggers when n new output samples have been "received", i.e. there are timestamps in the resulting moving window. But this is about input samples. So even if we receive 100 samples, if these are all older than newest timestamp we wouldn't get any new timestamp in the window but updated data points of older timestamps.

This makes sense to me, would stress that in the doc though, e.g the valid samples part is confusing IMO since this is indeed about the new samples.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

       """Wait until the next `n` samples have been received in the MovingWindow.

        This function returns after `n` input samples have been received, without considering
        whether the received samples are valid or which timestamp they have. The validity of 
        the samples in the updated moving window can be verified by calling the
        [`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it would wait until there are n output samples, but some output samples could be nan.

It does consider the timestamps when the samples are received. It expects n "new" samples to be available in the buffer before it returns.

The current tests only cover cases where there is no resampling in the moving window. I'll rectify that.

Copy link
Contributor Author

@shsms shsms Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to clarify that it returns after n new samples in the moving window, and not just input samples, as discussed.

@shsms shsms force-pushed the moving-window-samples-trigger branch from a001bf7 to f94c4fa Compare February 10, 2025 16:33
@shsms shsms requested a review from cwasicki February 10, 2025 16:36
@@ -318,6 +320,34 @@ def window(
start, end, force_copy=force_copy, fill_value=fill_value
)

async def wait_for_samples(self, n: int) -> None:
"""Wait until the next `n` samples are available in the MovingWindow.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see the confusion, I understood that it triggers when n new output samples have been "received", i.e. there are timestamps in the resulting moving window. But this is about input samples. So even if we receive 100 samples, if these are all older than newest timestamp we wouldn't get any new timestamp in the window but updated data points of older timestamps.

This makes sense to me, would stress that in the doc though, e.g the valid samples part is confusing IMO since this is indeed about the new samples.

@@ -318,6 +320,34 @@ def window(
start, end, force_copy=force_copy, fill_value=fill_value
)

async def wait_for_samples(self, n: int) -> None:
"""Wait until the next `n` samples are available in the MovingWindow.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

       """Wait until the next `n` samples have been received in the MovingWindow.

        This function returns after `n` input samples have been received, without considering
        whether the received samples are valid or which timestamp they have. The validity of 
        the samples in the updated moving window can be verified by calling the
        [`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method.

tests/timeseries/test_moving_window.py Show resolved Hide resolved
@shsms shsms requested a review from cwasicki February 11, 2025 08:12
src/frequenz/sdk/timeseries/_moving_window.py Outdated Show resolved Hide resolved
Comment on lines +343 to +346
raise ValueError(
"The number of samples to wait for must be less than or equal to the "
+ f"capacity of the MovingWindow ({self.capacity})."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also just silently wait for self.capacity instead, like slicing when you do [:10] for an array that has less than 10 items. It can be more confusing if you wanted to wait for 10 and got 5 instead but it is something at least familiar in Python. Again maybe @cwasicki can hint which approach would be more intuitive for data scientists.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of it that if n > capacity the n would still be respected, but calculated in terms of time steps since this was triggered. But I think the different understanding here shows already that this could be confusing and we can limit it until we have a case where we need it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, true, best to start with a safe approach, for my behaviour you can still easily get is via mw.wait_for_samples(min(n, mw.capacity)) I guess, so it doesn't look like it is adding too much value.

Comment on lines +354 to +357
async with self._condition_new_sample:
# Every time a new sample is received, this condition gets notified and
# will wake up.
_ = await self._condition_new_sample.wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud, and I don't even think we need to think about it for this PR, but maybe this could be done more efficiently by reversing the logic, and only set the condition when the counter is set (it could be even be a simple Event in this case). This would mean we would need to save the "waiters" in the instance, and then notify the appropriate waiter, so maybe that's a bit costly too, but I guess the most common case will be having only one or very few waiters. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most common case is also with n=1 and resampling_interval=15 minutes.

src/frequenz/sdk/timeseries/_moving_window.py Show resolved Hide resolved
src/frequenz/sdk/timeseries/_moving_window.py Outdated Show resolved Hide resolved
src/frequenz/sdk/timeseries/_moving_window.py Outdated Show resolved Hide resolved
@shsms shsms force-pushed the moving-window-samples-trigger branch 2 times, most recently from f5cb345 to 2155dca Compare February 11, 2025 14:04
It retains the original behaviour of counting all the valid samples in
the buffer when no time range is specified.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
It retains the original behaviour of counting all the valid samples in
the buffer when no time range is specified.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The resampler sends `None` values when there is no source data.  In
this case, the values need to be sent to the buffer, so that it can
update its gaps immediately, instead of waiting until when data is
finally available.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
@shsms shsms force-pushed the moving-window-samples-trigger branch from 2155dca to a2754f4 Compare February 11, 2025 14:32
@shsms
Copy link
Contributor Author

shsms commented Feb 11, 2025

Also added tests for wait_for_samples in moving windows with a resampler.

@shsms shsms requested a review from llucax February 11, 2025 14:37
cwasicki
cwasicki previously approved these changes Feb 11, 2025
Copy link
Collaborator

@cwasicki cwasicki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just minor clarification proposal.

async def wait_for_samples(self, n: int) -> None:
"""Wait until the next `n` samples are available in the MovingWindow.

This function returns after `n` samples are available in the MovingWindow,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would say after n new samples are available

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
@shsms shsms force-pushed the moving-window-samples-trigger branch from a2754f4 to 32e27cb Compare February 12, 2025 10:51
@shsms shsms requested a review from cwasicki February 12, 2025 10:53
@shsms shsms enabled auto-merge February 12, 2025 11:01
@llucax llucax disabled auto-merge February 12, 2025 11:28
Copy link
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I disabled auto-merge to give @cwasicki the opportunity to approve too, as he knows what's needed best.

@shsms
Copy link
Contributor Author

shsms commented Feb 12, 2025

He had already approved.

@shsms shsms added this pull request to the merge queue Feb 12, 2025
Merged via the queue into frequenz-floss:v1.x.x with commit bceccfc Feb 12, 2025
5 checks passed
@shsms shsms deleted the moving-window-samples-trigger branch February 12, 2025 14:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
part:data-pipeline Affects the data pipeline part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests
Projects
Development

Successfully merging this pull request may close these issues.

[MovingWindow] Add a trigger that fires after having received a fixed number of samples
3 participants